Skip to content

Conversation

NathanFlurry
Copy link
Member

Fixes RVT-5128

Copy link

linear bot commented Sep 6, 2025

Copy link

vercel bot commented Sep 6, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivet-site Ready Ready Preview Comment Sep 9, 2025 3:44am
1 Skipped Deployment
Project Deployment Preview Comments Updated (UTC)
rivet-studio Ignored Ignored Preview Sep 9, 2025 3:44am

Copy link
Member Author

NathanFlurry commented Sep 6, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

Copy link

claude bot commented Sep 6, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link

pkg-pr-new bot commented Sep 6, 2025

Open in StackBlitz

npm i https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@2878
npm i https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@2878
npm i https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-tunnel-protocol@2878

commit: d6b0b65

},
// Per-iteration teardown: drop the per-iteration subscription to unsubscribe
|_, mut sub| async move {
drop(&mut sub);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drop(&mut sub) expression doesn't actually drop the subscriber value, only the mutable reference to it. This means the subscription will remain active since the actual subscriber isn't being dropped. To properly clean up resources and trigger unsubscription, this should be changed to drop(sub) without the reference operator.

Suggested change
drop(&mut sub);
drop(sub);

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link

claude bot commented Sep 8, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link

claude bot commented Sep 8, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

@NathanFlurry NathanFlurry force-pushed the 09-05-chore_ups_add_message_chunking_ups_protocol branch from bf98d22 to c9cff29 Compare September 9, 2025 03:30
@NathanFlurry NathanFlurry force-pushed the 09-06-chore_ups_add_benches branch from 3586f1c to f68b17f Compare September 9, 2025 03:30
Copy link

claude bot commented Sep 9, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link

claude bot commented Sep 9, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Comment on lines +409 to +423
tokio::spawn(async move {
// Use subscribe_and_wait_propagate to ensure subscription is ready
let mut sub = subscribe_and_wait_propagate(&subscriber_clone, &subject_clone)
.await
.unwrap();
let _ = ready_tx.send(());
loop {
match sub.next().await {
std::result::Result::Ok(NextOutput::Message(msg)) => {
let _ = msg.reply(&msg.payload).await;
}
_ => break,
}
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spawned task in the request benchmark setup creates an infinite loop that continues running after the benchmark completes. This can lead to resource leakage as these background tasks accumulate across benchmark runs. Consider either:

  1. Capturing the JoinHandle from tokio::spawn() and aborting it during teardown
  2. Using a channel or atomic flag to signal the task to exit gracefully when the benchmark completes

This ensures proper cleanup of resources between benchmark iterations and prevents potential memory/thread leaks during extended benchmark runs.

Suggested change
tokio::spawn(async move {
// Use subscribe_and_wait_propagate to ensure subscription is ready
let mut sub = subscribe_and_wait_propagate(&subscriber_clone, &subject_clone)
.await
.unwrap();
let _ = ready_tx.send(());
loop {
match sub.next().await {
std::result::Result::Ok(NextOutput::Message(msg)) => {
let _ = msg.reply(&msg.payload).await;
}
_ => break,
}
}
});
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let handle = tokio::spawn(async move {
// Use subscribe_and_wait_propagate to ensure subscription is ready
let mut sub = subscribe_and_wait_propagate(&subscriber_clone, &subject_clone)
.await
.unwrap();
let _ = ready_tx.send(());
loop {
tokio::select! {
msg_result = sub.next() => {
match msg_result {
std::result::Result::Ok(NextOutput::Message(msg)) => {
let _ = msg.reply(&msg.payload).await;
}
_ => break,
}
}
_ = &mut shutdown_rx => {
break;
}
}
}
});

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +192 to +193
let timeout_result =
tokio::time::timeout(Duration::from_millis(10), guard.next()).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 10ms timeout creates a potential race condition in the benchmark. Under high load or network latency, legitimate messages might arrive after the timeout expires, causing the benchmark to undercount messages and potentially report inaccurate results. Consider either:

  1. Using a longer, more conservative timeout value (e.g., 100ms)
  2. Implementing a more deterministic approach that doesn't rely on timing
  3. Adding instrumentation to detect and report when messages might have been missed

This is particularly important for benchmarks that need to produce consistent, reproducible results across different environments.

Suggested change
let timeout_result =
tokio::time::timeout(Duration::from_millis(10), guard.next()).await;
let timeout_result =
tokio::time::timeout(Duration::from_millis(100), guard.next()).await;

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

@NathanFlurry NathanFlurry force-pushed the 09-05-chore_ups_add_message_chunking_ups_protocol branch from c9cff29 to a294c2e Compare September 9, 2025 03:38
Copy link
Contributor

graphite-app bot commented Sep 9, 2025

Merge activity

  • Sep 9, 5:46 PM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Sep 9, 5:47 PM UTC: CI is running for this pull request on a draft pull request (#2896) due to your merge queue CI optimization settings.
  • Sep 9, 5:49 PM UTC: Merged by the Graphite merge queue via draft PR: #2896.

graphite-app bot pushed a commit that referenced this pull request Sep 9, 2025
@graphite-app graphite-app bot closed this Sep 9, 2025
@graphite-app graphite-app bot deleted the 09-06-chore_ups_add_benches branch September 9, 2025 17:49
This was referenced Sep 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants